# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Functional tests for the SyncStorage server protocol.

This file runs tests to ensure the correct operation of the server
as specified in:

    http://docs.services.mozilla.com/storage/apis-1.5.html

If there's an aspect of that spec that's not covered by a test in this file,
consider it a bug.

"""

import pytest

import re
import json
import time
import random
import string
import urllib
import webtest
import contextlib

# import math
import simplejson

from pyramid.interfaces import IAuthenticationPolicy
from webtest.app import AppError

from tools.integration_tests.test_support import StorageFunctionalTestCase

import tokenlib


class ConflictError(Exception):
    pass


class BackendError(Exception):
    pass


WEAVE_INVALID_WBO = 8  # Invalid Weave Basic Object
WEAVE_SIZE_LIMIT_EXCEEDED = 17  # Size limit exceeded

BATCH_MAX_IDS = 100


def get_limit_config(request, limit):
    """Get the configured value for the named size limit."""
    return request.registry.settings["storage." + limit]


def json_dumps(value):
    """Decimal-aware version of json.dumps()."""
    return simplejson.dumps(value, use_decimal=True)


def json_loads(value):
    """Decimal-aware version of json.loads()."""
    return simplejson.loads(value, use_decimal=True)


_PLD = "*" * 500
_ASCII = string.ascii_letters + string.digits


def randtext(size=10):
    return "".join([random.choice(_ASCII) for i in range(size)])


@pytest.mark.usefixtures("setup_server_local_testing")
class TestStorage(StorageFunctionalTestCase):
    """Storage testcases that only use the web API.

    These tests are suitable for running against both in-process and live
    external web servers.
    """

    def setUp(self):
        """Call setUp, set API path for current user, delete
        old root path with user for a clean slate in each test.
        """
        super(TestStorage, self).setUp()
        self.root = "/1.5/%d" % (self.user_id,)

        self.retry_delete(self.root)

    @contextlib.contextmanager
    def _switch_user(self):
        """Allows for temporary switch url to another user id.
        Context manager yields for duration of test and then 
        returns to original user, regardless of test result.
        If unsuccessful, root url is retained.
        """
        orig_root = self.root
        try:
            with super(TestStorage, self)._switch_user():
                self.root = "/1.5/%d" % (self.user_id,)
                yield
        finally:
            self.root = orig_root

    def retry_post_json(self, *args, **kwargs):
        """Helper wrapper for any POST operation."""
        return self._retry_send(self.app.post_json, *args, **kwargs)

    def retry_put_json(self, *args, **kwargs):
        """Helper wrapper for any PUT operation."""
        return self._retry_send(self.app.put_json, *args, **kwargs)

    def retry_delete(self, *args, **kwargs):
        """Helper wrapper for any DELETE operation."""
        return self._retry_send(self.app.delete, *args, **kwargs)

    def _retry_send(self, func, *args, **kwargs):
        try:
            # Try to call underlying webtest method with args.
            return func(*args, **kwargs) # Generic callable
        except webtest.AppError as ex:
            # If non-200 resp, we want to retry as may be transient 
            # status. If 409 (conflict) or 503 (Service Unavailable)
            #  are not present, err non-transient and we re-raise, 
            # no retry.
            if "409 " not in ex.args[0] and "503 " not in ex.args[0]:
                raise ex
            time.sleep(0.01)
            return func(*args, **kwargs)

    def test_get_info_collections(self):
        # xxx_col1 gets 3 items, xxx_col2 gets 5 items.
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
        resp = self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
        ts1 = resp.json["modified"]
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
        resp = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        ts2 = resp.json["modified"]
        # only those collections should appear in the query.
        resp = self.app.get(self.root + "/info/collections")
        res = resp.json
        keys = sorted(list(res.keys()))
        self.assertEqual(keys, ["xxx_col1", "xxx_col2"])
        self.assertEqual(res["xxx_col1"], ts1)
        self.assertEqual(res["xxx_col2"], ts2)
        # Updating items in xxx_col2, check timestamps.
        bsos = [{"id": str(i).zfill(2), "payload": "yyy"} for i in range(2)]
        resp = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        self.assertTrue(ts2 < resp.json["modified"])
        ts2 = resp.json["modified"]
        resp = self.app.get(self.root + "/info/collections")
        res = resp.json
        keys = sorted(list(res.keys()))
        self.assertEqual(keys, ["xxx_col1", "xxx_col2"])
        self.assertEqual(res["xxx_col1"], ts1)
        self.assertEqual(res["xxx_col2"], ts2)

    def test_get_collection_count(self):
        # xxx_col1 gets 3 items, xxx_col2 gets 5 items.
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
        self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        # those counts should be reflected back in query.
        resp = self.app.get(self.root + "/info/collection_counts")
        res = resp.json
        self.assertEqual(len(res), 2)
        self.assertEqual(res["xxx_col1"], 3)
        self.assertEqual(res["xxx_col2"], 5)

    def test_bad_cache(self):
        # fixes #637332
        # the collection name <-> id mapper is temporarely cached to
        # save a few requests.
        # but should get purged when new collections are added

        # 1. get collection info
        resp = self.app.get(self.root + "/info/collections")
        numcols = len(resp.json)

        # 2. add a new collection + stuff
        bso = {"id": "125", "payload": _PLD}
        self.retry_put_json(self.root + "/storage/xxxx/125", bso)

        # 3. get collection info again, should find the new ones
        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(len(resp.json), numcols + 1)

    def test_get_collection_only(self):
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        # non-existent collections appear as empty
        resp = self.app.get(self.root + "/storage/nonexistent")
        res = resp.json
        self.assertEqual(res, [])

        # try just getting all items at once.
        resp = self.app.get(self.root + "/storage/xxx_col2")
        res = resp.json
        res.sort()
        self.assertEqual(res, ["00", "01", "02", "03", "04"])
        self.assertEqual(int(resp.headers["X-Weave-Records"]), 5)

        # trying various filters

        # "ids"
        # Returns the ids for objects in the collection that are in the
        # provided comma-separated list.
        res = self.app.get(self.root + "/storage/xxx_col2?ids=01,03,17")
        res = res.json
        res.sort()
        self.assertEqual(res, ["01", "03"])

        # "newer"
        # Returns only ids for objects in the collection that have been last
        # modified after the timestamp given.

        self.retry_delete(self.root + "/storage/xxx_col2")

        bso = {"id": "128", "payload": "x"}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/128", bso)
        ts1 = float(res.headers["X-Last-Modified"])

        bso = {"id": "129", "payload": "x"}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/129", bso)
        ts2 = float(res.headers["X-Last-Modified"])

        self.assertTrue(ts1 < ts2)

        res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % ts1)
        self.assertEqual(res.json, ["129"])

        res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % ts2)
        self.assertEqual(res.json, [])

        res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % (ts1 - 1))
        self.assertEqual(sorted(res.json), ["128", "129"])

        # "older"
        # Returns only ids for objects in the collection that have been last
        # modified before the timestamp given.

        self.retry_delete(self.root + "/storage/xxx_col2")

        bso = {"id": "128", "payload": "x"}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/128", bso)
        ts1 = float(res.headers["X-Last-Modified"])

        bso = {"id": "129", "payload": "x"}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/129", bso)
        ts2 = float(res.headers["X-Last-Modified"])

        self.assertTrue(ts1 < ts2)

        res = self.app.get(self.root + "/storage/xxx_col2?older=%s" % ts1)
        self.assertEqual(res.json, [])

        res = self.app.get(self.root + "/storage/xxx_col2?older=%s" % ts2)
        self.assertEqual(res.json, ["128"])

        res = self.app.get(self.root + "/storage/xxx_col2?older=%s" % (ts2 + 1))
        self.assertEqual(sorted(res.json), ["128", "129"])

        qs = "?older=%s&newer=%s" % (ts2 + 1, ts1)
        res = self.app.get(self.root + "/storage/xxx_col2" + qs)
        self.assertEqual(sorted(res.json), ["129"])

        # "full"
        # If defined, returns the full BSO, rather than just the id.
        res = self.app.get(self.root + "/storage/xxx_col2?full=1")
        keys = list(res.json[0].keys())
        keys.sort()
        wanted = ["id", "modified", "payload"]
        self.assertEqual(keys, wanted)

        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertTrue(isinstance(res.json, list))

        # "limit"
        # Sets the maximum number of ids that will be returned
        self.retry_delete(self.root + "/storage/xxx_col2")

        bsos = []
        for i in range(10):
            bso = {"id": str(i).zfill(2), "payload": "x", "sortindex": i}
            bsos.append(bso)
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        query_url = self.root + "/storage/xxx_col2?sort=index"
        res = self.app.get(query_url)
        all_items = res.json
        self.assertEqual(len(all_items), 10)

        res = self.app.get(query_url + "&limit=2")
        self.assertEqual(res.json, all_items[:2])

        # "offset"
        # Skips over items that have already been returned.
        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
        self.assertEqual(res.json, all_items[2:5])

        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&offset=" + next_offset)
        self.assertEqual(res.json, all_items[5:])
        self.assertTrue("X-Weave-Next-Offset" not in res.headers)

        res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)
        self.assertEqual(res.json, all_items[5:])
        self.assertTrue("X-Weave-Next-Offset" not in res.headers)

        # "offset" again, this time ordering by descending timestamp.
        query_url = self.root + "/storage/xxx_col2?sort=newest"
        res = self.app.get(query_url)
        all_items = res.json
        self.assertEqual(len(all_items), 10)

        res = self.app.get(query_url + "&limit=2")
        self.assertEqual(res.json, all_items[:2])

        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
        self.assertEqual(res.json, all_items[2:5])

        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&offset=" + next_offset)
        self.assertEqual(res.json, all_items[5:])
        self.assertTrue("X-Weave-Next-Offset" not in res.headers)

        res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)
        self.assertEqual(res.json, all_items[5:])

        # "offset" again, this time ordering by ascending timestamp.
        query_url = self.root + "/storage/xxx_col2?sort=oldest"
        res = self.app.get(query_url)
        all_items = res.json
        self.assertEqual(len(all_items), 10)

        res = self.app.get(query_url + "&limit=2")
        self.assertEqual(res.json, all_items[:2])

        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
        self.assertEqual(res.json, all_items[2:5])

        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&offset=" + next_offset)
        self.assertEqual(res.json, all_items[5:])
        self.assertTrue("X-Weave-Next-Offset" not in res.headers)

        res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)
        self.assertEqual(res.json, all_items[5:])

        # "offset" once more, this time with no explicit ordering
        query_url = self.root + "/storage/xxx_col2?"
        res = self.app.get(query_url)
        all_items = res.json
        self.assertEqual(len(all_items), 10)

        res = self.app.get(query_url + "&limit=2")
        self.assertEqual(res.json, all_items[:2])

        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
        self.assertEqual(res.json, all_items[2:5])

        next_offset = res.headers["X-Weave-Next-Offset"]
        res = self.app.get(query_url + "&offset=" + next_offset)
        self.assertEqual(res.json, all_items[5:])
        self.assertTrue("X-Weave-Next-Offset" not in res.headers)

        res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)

        # "sort"
        #   'newest': Orders by timestamp number (newest first)
        #   'oldest': Orders by timestamp number (oldest first)
        #   'index':  Orders by the sortindex descending (highest weight first)
        self.retry_delete(self.root + "/storage/xxx_col2")

        for index, sortindex in (("00", -1), ("01", 34), ("02", 12)):
            bso = {"id": index, "payload": "x", "sortindex": sortindex}
            self.retry_post_json(self.root + "/storage/xxx_col2", [bso])

        res = self.app.get(self.root + "/storage/xxx_col2?sort=newest")
        res = res.json
        self.assertEqual(res, ["02", "01", "00"])

        res = self.app.get(self.root + "/storage/xxx_col2?sort=oldest")
        res = res.json
        self.assertEqual(res, ["00", "01", "02"])

        res = self.app.get(self.root + "/storage/xxx_col2?sort=index")
        res = res.json
        self.assertEqual(res, ["01", "02", "00"])

    def test_alternative_formats(self):
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        # application/json
        res = self.app.get(
            self.root + "/storage/xxx_col2",
            headers=[("Accept", "application/json")],
        )
        self.assertEqual(res.content_type.split(";")[0], "application/json")

        res = res.json
        res.sort()
        self.assertEqual(res, ["00", "01", "02", "03", "04"])

        # application/newlines
        res = self.app.get(
            self.root + "/storage/xxx_col2",
            headers=[("Accept", "application/newlines")],
        )
        self.assertEqual(res.content_type, "application/newlines")

        self.assertTrue(res.body.endswith(b"\n"))
        res = [
            json_loads(line) for line in res.body.decode("utf-8").strip().split("\n")
        ]
        res.sort()
        self.assertEqual(res, ["00", "01", "02", "03", "04"])

        # unspecified format defaults to json
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(res.content_type.split(";")[0], "application/json")

        # unkown format gets a 406
        self.app.get(
            self.root + "/storage/xxx_col2",
            headers=[("Accept", "x/yy")],
            status=406,
        )

    def test_set_collection_with_if_modified_since(self):
        # Create five items with different timestamps.
        for i in range(5):
            bsos = [{"id": str(i).zfill(2), "payload": "xxx"}]
            self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        # Get them all, along with their timestamps.
        res = self.app.get(self.root + "/storage/xxx_col2?full=true").json
        self.assertEqual(len(res), 5)
        timestamps = sorted([r["modified"] for r in res])
        # The timestamp of the collection should be the max of all those.
        self.app.get(
            self.root + "/storage/xxx_col2",
            headers={"X-If-Modified-Since": str(timestamps[0])},
            status=200,
        )
        res = self.app.get(
            self.root + "/storage/xxx_col2",
            headers={"X-If-Modified-Since": str(timestamps[-1])},
            status=304,
        )
        self.assertTrue("X-Last-Modified" in res.headers)

    def test_get_item(self):
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        # grabbing object 1 from xxx_col2
        res = self.app.get(self.root + "/storage/xxx_col2/01")
        res = res.json
        keys = list(res.keys())
        keys.sort()
        self.assertEqual(keys, ["id", "modified", "payload"])
        self.assertEqual(res["id"], "01")

        # unexisting object
        self.app.get(self.root + "/storage/xxx_col2/99", status=404)

        # using x-if-modified-since header.
        self.app.get(
            self.root + "/storage/xxx_col2/01",
            headers={"X-If-Modified-Since": str(res["modified"])},
            status=304,
        )
        self.app.get(
            self.root + "/storage/xxx_col2/01",
            headers={"X-If-Modified-Since": str(res["modified"] + 1)},
            status=304,
        )
        res = self.app.get(
            self.root + "/storage/xxx_col2/01",
            headers={"X-If-Modified-Since": str(res["modified"] - 1)},
        )
        self.assertEqual(res.json["id"], "01")

    def test_set_item(self):
        # let's create an object
        bso = {"payload": _PLD}
        self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
        res = self.app.get(self.root + "/storage/xxx_col2/12345")
        res = res.json
        self.assertEqual(res["payload"], _PLD)

        # now let's update it
        bso = {"payload": "YYY"}
        self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
        res = self.app.get(self.root + "/storage/xxx_col2/12345")
        res = res.json
        self.assertEqual(res["payload"], "YYY")

    def test_set_collection(self):
        # sending two bsos
        bso1 = {"id": "12", "payload": _PLD}
        bso2 = {"id": "13", "payload": _PLD}
        bsos = [bso1, bso2]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        # checking what we did
        res = self.app.get(self.root + "/storage/xxx_col2/12")
        res = res.json
        self.assertEqual(res["payload"], _PLD)
        res = self.app.get(self.root + "/storage/xxx_col2/13")
        res = res.json
        self.assertEqual(res["payload"], _PLD)

        # one more time, with changes
        bso1 = {"id": "13", "payload": "XyX"}
        bso2 = {"id": "14", "payload": _PLD}
        bsos = [bso1, bso2]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        # checking what we did
        res = self.app.get(self.root + "/storage/xxx_col2/14")
        res = res.json
        self.assertEqual(res["payload"], _PLD)
        res = self.app.get(self.root + "/storage/xxx_col2/13")
        res = res.json
        self.assertEqual(res["payload"], "XyX")

        # sending two bsos with one bad sortindex
        bso1 = {"id": "one", "payload": _PLD}
        bso2 = {"id": "two", "payload": _PLD, "sortindex": "FAIL"}
        bsos = [bso1, bso2]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        self.app.get(self.root + "/storage/xxx_col2/two", status=404)

    def test_set_collection_input_formats(self):
        # If we send with application/newlines it should work.
        bso1 = {"id": "12", "payload": _PLD}
        bso2 = {"id": "13", "payload": _PLD}
        bsos = [bso1, bso2]
        body = "\n".join(json_dumps(bso) for bso in bsos)
        self.app.post(
            self.root + "/storage/xxx_col2",
            body,
            headers={"Content-Type": "application/newlines"},
        )
        items = self.app.get(self.root + "/storage/xxx_col2").json
        self.assertEqual(len(items), 2)
        # If we send an unknown content type, we get an error.
        self.retry_delete(self.root + "/storage/xxx_col2")
        body = json_dumps(bsos)
        self.app.post(
            self.root + "/storage/xxx_col2",
            body,
            headers={"Content-Type": "application/octet-stream"},
            status=415,
        )
        items = self.app.get(self.root + "/storage/xxx_col2").json
        self.assertEqual(len(items), 0)

    def test_set_item_input_formats(self):
        # If we send with application/json it should work.
        body = json_dumps({"payload": _PLD})
        self.app.put(
            self.root + "/storage/xxx_col2/TEST",
            body,
            headers={"Content-Type": "application/json"},
        )
        item = self.app.get(self.root + "/storage/xxx_col2/TEST").json
        self.assertEqual(item["payload"], _PLD)
        # If we send json with some other content type, it should fail
        self.retry_delete(self.root + "/storage/xxx_col2")
        self.app.put(
            self.root + "/storage/xxx_col2/TEST",
            body,
            headers={"Content-Type": "application/octet-stream"},
            status=415,
        )
        self.app.get(self.root + "/storage/xxx_col2/TEST", status=404)
        # Unless we use text/plain, which is a special bw-compat case.
        self.app.put(
            self.root + "/storage/xxx_col2/TEST",
            body,
            headers={"Content-Type": "text/plain"},
        )
        item = self.app.get(self.root + "/storage/xxx_col2/TEST").json
        self.assertEqual(item["payload"], _PLD)

    def test_app_newlines_when_payloads_contain_newlines(self):
        # Send some application/newlines with embedded newline chars.
        bsos = [
            {"id": "01", "payload": "hello\nworld"},
            {"id": "02", "payload": "\nmarco\npolo\n"},
        ]
        body = "\n".join(json_dumps(bso) for bso in bsos)
        self.assertEqual(len(body.split("\n")), 2)
        self.app.post(
            self.root + "/storage/xxx_col2",
            body,
            headers={"Content-Type": "application/newlines"},
        )
        # Read them back as JSON list, check payloads.
        items = self.app.get(self.root + "/storage/xxx_col2?full=1").json
        self.assertEqual(len(items), 2)
        items.sort(key=lambda bso: bso["id"])
        self.assertEqual(items[0]["payload"], bsos[0]["payload"])
        self.assertEqual(items[1]["payload"], bsos[1]["payload"])
        # Read them back as application/newlines, check payloads.
        res = self.app.get(
            self.root + "/storage/xxx_col2?full=1",
            headers={
                "Accept": "application/newlines",
            },
        )
        items = [
            json_loads(line) for line in res.body.decode("utf-8").strip().split("\n")
        ]
        self.assertEqual(len(items), 2)
        items.sort(key=lambda bso: bso["id"])
        self.assertEqual(items[0]["payload"], bsos[0]["payload"])
        self.assertEqual(items[1]["payload"], bsos[1]["payload"])

    def test_collection_usage(self):
        self.retry_delete(self.root + "/storage")

        bso1 = {"id": "13", "payload": "XyX"}
        bso2 = {"id": "14", "payload": _PLD}
        bsos = [bso1, bso2]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        res = self.app.get(self.root + "/info/collection_usage")
        usage = res.json
        xxx_col2_size = usage["xxx_col2"]
        wanted = (len(bso1["payload"]) + len(bso2["payload"])) / 1024.0
        self.assertEqual(round(xxx_col2_size, 2), round(wanted, 2))

    def test_delete_collection_items(self):
        # creating a collection of three
        bso1 = {"id": "12", "payload": _PLD}
        bso2 = {"id": "13", "payload": _PLD}
        bso3 = {"id": "14", "payload": _PLD}
        bsos = [bso1, bso2, bso3]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 3)

        # deleting all items
        self.retry_delete(self.root + "/storage/xxx_col2")
        items = self.app.get(self.root + "/storage/xxx_col2").json
        self.assertEqual(len(items), 0)

        # Deletes the ids for objects in the collection that are in the
        # provided comma-separated list.
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 3)
        self.retry_delete(self.root + "/storage/xxx_col2?ids=12,14")
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 1)
        self.retry_delete(self.root + "/storage/xxx_col2?ids=13")
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 0)

    def test_delete_item(self):
        # creating a collection of three
        bso1 = {"id": "12", "payload": _PLD}
        bso2 = {"id": "13", "payload": _PLD}
        bso3 = {"id": "14", "payload": _PLD}
        bsos = [bso1, bso2, bso3]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 3)
        ts = float(res.headers["X-Last-Modified"])

        # deleting item 13
        self.retry_delete(self.root + "/storage/xxx_col2/13")
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 2)

        # unexisting item should return a 404
        self.retry_delete(self.root + "/storage/xxx_col2/12982", status=404)

        # The collection should get an updated timestsamp.
        res = self.app.get(self.root + "/info/collections")
        self.assertTrue(ts < float(res.headers["X-Last-Modified"]))

    def test_delete_storage(self):
        # creating a collection of three
        bso1 = {"id": "12", "payload": _PLD}
        bso2 = {"id": "13", "payload": _PLD}
        bso3 = {"id": "14", "payload": _PLD}
        bsos = [bso1, bso2, bso3]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 3)

        # deleting all
        self.retry_delete(self.root + "/storage")
        items = self.app.get(self.root + "/storage/xxx_col2").json
        self.assertEqual(len(items), 0)
        self.retry_delete(self.root + "/storage/xxx_col2", status=200)
        self.assertEqual(len(items), 0)

    def test_x_timestamp_header(self):
        if self.distant:
            pytest.skip("Test cannot be run against a live server.")

        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        now = round(time.time(), 2)
        time.sleep(0.01)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertTrue(now <= float(res.headers["X-Weave-Timestamp"]))

        # getting the timestamp with a PUT
        now = round(time.time(), 2)
        time.sleep(0.01)
        bso = {"payload": _PLD}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
        self.assertTrue(now <= float(res.headers["X-Weave-Timestamp"]))
        self.assertTrue(abs(now - float(res.headers["X-Weave-Timestamp"])) <= 200)

        # getting the timestamp with a POST
        now = round(time.time(), 2)
        time.sleep(0.01)
        bso1 = {"id": "12", "payload": _PLD}
        bso2 = {"id": "13", "payload": _PLD}
        bsos = [bso1, bso2]
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        self.assertTrue(now <= float(res.headers["X-Weave-Timestamp"]))

    def test_ifunmodifiedsince(self):
        bso = {"id": "12345", "payload": _PLD}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
        # Using an X-If-Unmodified-Since in the past should cause 412s.
        ts = str(float(res.headers["X-Last-Modified"]) - 1)
        bso = {"id": "12345", "payload": _PLD + "XXX"}
        res = self.retry_put_json(
            self.root + "/storage/xxx_col2/12345",
            bso,
            headers=[("X-If-Unmodified-Since", ts)],
            status=412,
        )
        self.assertTrue("X-Last-Modified" in res.headers)
        res = self.retry_delete(
            self.root + "/storage/xxx_col2/12345",
            headers=[("X-If-Unmodified-Since", ts)],
            status=412,
        )
        self.assertTrue("X-Last-Modified" in res.headers)
        self.retry_post_json(
            self.root + "/storage/xxx_col2",
            [bso],
            headers=[("X-If-Unmodified-Since", ts)],
            status=412,
        )
        self.retry_delete(
            self.root + "/storage/xxx_col2?ids=12345",
            headers=[("X-If-Unmodified-Since", ts)],
            status=412,
        )
        self.app.get(
            self.root + "/storage/xxx_col2/12345",
            headers=[("X-If-Unmodified-Since", ts)],
            status=412,
        )
        self.app.get(
            self.root + "/storage/xxx_col2",
            headers=[("X-If-Unmodified-Since", ts)],
            status=412,
        )
        # Deleting items from a collection should give 412 even if some
        # other, unrelated item in the collection has been modified.
        ts = res.headers["X-Last-Modified"]
        res2 = self.retry_put_json(
            self.root + "/storage/xxx_col2/54321",
            {
                "payload": _PLD,
            },
        )
        self.retry_delete(
            self.root + "/storage/xxx_col2?ids=12345",
            headers=[("X-If-Unmodified-Since", ts)],
            status=412,
        )
        ts = res2.headers["X-Last-Modified"]
        # All of those should have left the BSO unchanged
        res2 = self.app.get(self.root + "/storage/xxx_col2/12345")
        self.assertEqual(res2.json["payload"], _PLD)
        self.assertEqual(
            res2.headers["X-Last-Modified"], res.headers["X-Last-Modified"]
        )
        # Using an X-If-Unmodified-Since equal to
        # X-Last-Modified should allow the request to succeed.
        res = self.retry_post_json(
            self.root + "/storage/xxx_col2",
            [bso],
            headers=[("X-If-Unmodified-Since", ts)],
            status=200,
        )
        ts = res.headers["X-Last-Modified"]
        self.app.get(
            self.root + "/storage/xxx_col2/12345",
            headers=[("X-If-Unmodified-Since", ts)],
            status=200,
        )
        self.retry_delete(
            self.root + "/storage/xxx_col2/12345",
            headers=[("X-If-Unmodified-Since", ts)],
            status=200,
        )
        res = self.retry_put_json(
            self.root + "/storage/xxx_col2/12345",
            bso,
            headers=[("X-If-Unmodified-Since", "0")],
            status=200,
        )
        ts = res.headers["X-Last-Modified"]
        self.app.get(
            self.root + "/storage/xxx_col2",
            headers=[("X-If-Unmodified-Since", ts)],
            status=200,
        )
        self.retry_delete(
            self.root + "/storage/xxx_col2?ids=12345",
            headers=[("X-If-Unmodified-Since", ts)],
            status=200,
        )

    def test_quota(self):
        res = self.app.get(self.root + "/info/quota")
        old_used = res.json[0]
        bso = {"payload": _PLD}
        self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
        res = self.app.get(self.root + "/info/quota")
        used = res.json[0]
        self.assertEqual(used - old_used, len(_PLD) / 1024.0)

    def test_get_collection_ttl(self):
        bso = {"payload": _PLD, "ttl": 0}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
        time.sleep(1.1)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(res.json, [])

        bso = {"payload": _PLD, "ttl": 2}
        res = self.retry_put_json(self.root + "/storage/xxx_col2/123456", bso)

        # it should exists now
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 1)

        # trying a second put again
        self.retry_put_json(self.root + "/storage/xxx_col2/123456", bso)

        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 1)
        time.sleep(2.1)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 0)

    def test_multi_item_post_limits(self):
        res = self.app.get(self.root + "/info/configuration")
        try:
            max_bytes = res.json["max_post_bytes"]
            max_count = res.json["max_post_records"]
            max_req_bytes = res.json["max_request_bytes"]
        except KeyError:
            # Can't run against live server if it doesn't
            # report the right config options.
            if self.distant:
                pytest.skip("")
            max_bytes = get_limit_config(self.config, "max_post_bytes")
            max_count = get_limit_config(self.config, "max_post_records")
            max_req_bytes = get_limit_config(self.config, "max_request_bytes")

        # Uploading max_count-5 small objects should succeed.
        bsos = [{"id": str(i).zfill(2), "payload": "X"} for i in range(max_count - 5)]
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = res.json
        self.assertEqual(len(res["success"]), max_count - 5)
        self.assertEqual(len(res["failed"]), 0)

        # Uploading max_count+5 items should produce five failures.
        bsos = [{"id": str(i).zfill(2), "payload": "X"} for i in range(max_count + 5)]
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = res.json
        self.assertEqual(len(res["success"]), max_count)
        self.assertEqual(len(res["failed"]), 5)

        # Uploading items such that the last item puts us over the
        # cumulative limit on payload size, should produce 1 failure.
        # The item_size here is arbitrary, so I made it a prime in kB.
        item_size = 227 * 1024
        max_items, leftover = divmod(max_bytes, item_size)
        bsos = [
            {"id": str(i).zfill(2), "payload": "X" * item_size}
            for i in range(max_items)
        ]
        bsos.append({"id": str(max_items), "payload": "X" * (leftover + 1)})

        # Check that we don't go over the limit on raw request bytes,
        # which would get us rejected in production with a 413.
        self.assertTrue(len(json.dumps(bsos)) < max_req_bytes)

        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = res.json
        self.assertEqual(len(res["success"]), max_items)
        self.assertEqual(len(res["failed"]), 1)

    def test_weird_args(self):
        # pushing some data in xxx_col2
        bsos = [{"id": str(i).zfill(2), "payload": _PLD} for i in range(10)]
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = res.json

        # trying weird args and make sure the server returns 400s
        # Note: "Offset" is a string since the bsoid could be anything.
        # skipping that for now.
        args = ("newer", "older", "limit")
        for arg in args:
            value = randtext()
            self.app.get(
                self.root + "/storage/xxx_col2?%s=%s" % (arg, value),
                status=400,
            )

        # what about a crazy ids= string ?
        ids = ",".join([randtext(10) for i in range(100)])
        res = self.app.get(self.root + "/storage/xxx_col2?ids=%s" % ids)
        self.assertEqual(res.json, [])

        # trying unexpected args - they should not break
        self.app.get(self.root + "/storage/xxx_col2?blabla=1", status=200)

    def test_guid_deletion(self):
        # pushing some data in xxx_col2
        bsos = [
            {
                "id": "6820f3ca-6e8a-4ff4-8af7-8b3625d7d65%d" % i,
                "payload": _PLD,
            }
            for i in range(5)
        ]
        res = self.retry_post_json(self.root + "/storage/passwords", bsos)
        res = res.json
        self.assertEqual(len(res["success"]), 5)

        # now deleting some of them
        ids = ",".join(["6820f3ca-6e8a-4ff4-8af7-8b3625d7d65%d" % i for i in range(2)])

        self.retry_delete(self.root + "/storage/passwords?ids=%s" % ids)

        res = self.app.get(self.root + "/storage/passwords?ids=%s" % ids)
        self.assertEqual(len(res.json), 0)
        res = self.app.get(self.root + "/storage/passwords")
        self.assertEqual(len(res.json), 3)

    def test_specifying_ids_with_percent_encoded_query_string(self):
        # create some items
        bsos = [{"id": "test-%d" % i, "payload": _PLD} for i in range(5)]
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = res.json
        self.assertEqual(len(res["success"]), 5)
        # now delete some of them
        ids = ",".join(["test-%d" % i for i in range(2)])
        ids = urllib.request.quote(ids)
        self.retry_delete(self.root + "/storage/xxx_col2?ids=%s" % ids)
        # check that the correct items were deleted
        res = self.app.get(self.root + "/storage/xxx_col2?ids=%s" % ids)
        self.assertEqual(len(res.json), 0)
        res = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(len(res.json), 3)

    def test_timestamp_numbers_are_decimals(self):
        # Create five items with different timestamps.
        for i in range(5):
            bsos = [{"id": str(i).zfill(2), "payload": "xxx"}]
            self.retry_post_json(self.root + "/storage/xxx_col2", bsos)

        # make sure the server returns only proper precision timestamps.
        resp = self.app.get(self.root + "/storage/xxx_col2?full=1")
        bsos = json_loads(resp.body)
        timestamps = []
        for bso in bsos:
            ts = bso["modified"]
            # timestamps could be on the hundred seconds (.10) or on the
            # second (.0) and the zero could be dropped. We just don't want
            # anything beyond milisecond.
            self.assertLessEqual(len(str(ts).split(".")[-1]), 2)
            timestamps.append(ts)

        timestamps.sort()

        # try a newer filter now, to get the last two objects
        ts = float(timestamps[-3])

        # Returns only ids for objects in the collection that have been
        # last modified since the timestamp given.
        res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % ts)
        res = res.json
        try:
            self.assertEqual(sorted(res), ["03", "04"])
        except AssertionError:
            # need to display the whole collection to understand the issue
            msg = "Timestamp used: %s" % ts
            msg += " " + self.app.get(self.root + "/storage/xxx_col2?full=1").body
            msg += " Timestamps received: %s" % str(timestamps)
            msg += " Result of newer query: %s" % res
            raise AssertionError(msg)

    def test_strict_newer(self):
        # send two bsos in the 'meh' collection
        bso1 = {"id": "01", "payload": _PLD}
        bso2 = {"id": "02", "payload": _PLD}
        bsos = [bso1, bso2]
        res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)
        ts = float(res.headers["X-Last-Modified"])

        # send two more bsos
        bso3 = {"id": "03", "payload": _PLD}
        bso4 = {"id": "04", "payload": _PLD}
        bsos = [bso3, bso4]
        res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)

        # asking for bsos using newer=ts where newer is the timestamp
        # of bso 1 and 2, should not return them
        res = self.app.get(self.root + "/storage/xxx_meh?newer=%s" % ts)
        res = res.json
        self.assertEqual(sorted(res), ["03", "04"])

    def test_strict_older(self):
        # send two bsos in the 'xxx_meh' collection
        bso1 = {"id": "01", "payload": _PLD}
        bso2 = {"id": "02", "payload": _PLD}
        bsos = [bso1, bso2]
        res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)

        # send two more bsos
        bso3 = {"id": "03", "payload": _PLD}
        bso4 = {"id": "04", "payload": _PLD}
        bsos = [bso3, bso4]
        res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)
        ts = float(res.headers["X-Last-Modified"])

        # asking for bsos using older=ts where older is the timestamp
        # of bso 3 and 4, should not return them
        res = self.app.get(self.root + "/storage/xxx_meh?older=%s" % ts)
        res = res.json
        self.assertEqual(sorted(res), ["01", "02"])

    def test_handling_of_invalid_json_in_bso_uploads(self):
        # Single upload with JSON that's not a BSO.
        bso = "notabso"
        res = self.retry_put_json(
            self.root + "/storage/xxx_col2/invalid", bso, status=400
        )
        self.assertEqual(res.json, WEAVE_INVALID_WBO)

        bso = 42
        res = self.retry_put_json(
            self.root + "/storage/xxx_col2/invalid", bso, status=400
        )
        self.assertEqual(res.json, WEAVE_INVALID_WBO)

        bso = {"id": ["01", "02"], "payload": {"3": "4"}}
        res = self.retry_put_json(
            self.root + "/storage/xxx_col2/invalid", bso, status=400
        )
        self.assertEqual(res.json, WEAVE_INVALID_WBO)

        # Batch upload with JSON that's not a list of BSOs
        bsos = "notalist"
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)

        bsos = 42
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)

        # Batch upload a list with something that's not a valid data dict.
        # It should fail out entirely, as the input is seriously broken.
        bsos = [{"id": "01", "payload": "GOOD"}, "BAD"]
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos, status=400)

        # Batch upload a list with something that's an invalid BSO.
        # It should process the good entry and fail for the bad.
        bsos = [{"id": "01", "payload": "GOOD"}, {"id": "02", "invalid": "ya"}]
        res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        res = res.json
        self.assertEqual(len(res["success"]), 1)
        self.assertEqual(len(res["failed"]), 1)

    def test_handling_of_invalid_bso_fields(self):
        coll_url = self.root + "/storage/xxx_col2"
        # Invalid ID - unacceptable characters.
        # The newline cases are especially nuanced because \n
        # gets special treatment from the regex library.
        bso = {"id": "A\nB", "payload": "testing"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        bso = {"id": "A\n", "payload": "testing"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        bso = {"id": "\nN", "payload": "testing"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        bso = {"id": "A\tB", "payload": "testing"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        # Invalid ID - empty string is not acceptable.
        bso = {"id": "", "payload": "testing"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        # Invalid ID - too long
        bso = {"id": "X" * 65, "payload": "testing"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        # Commenting out this test.
        # This uses the same invalid BSO from above, which should return a 400
        """
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=404)
        """
        # Invalid sortindex - not an integer
        bso = {"id": "TEST", "payload": "testing", "sortindex": "xxx_meh"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)
        # Invalid sortindex - not an integer
        bso = {"id": "TEST", "payload": "testing", "sortindex": "2.6"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)
        # Invalid sortindex - larger than max value
        bso = {"id": "TEST", "payload": "testing", "sortindex": "1" + "0" * 9}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)
        # Invalid payload - not a string
        bso = {"id": "TEST", "payload": 42}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)
        # Invalid ttl - not an integer
        bso = {"id": "TEST", "payload": "testing", "ttl": "eh?"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)
        # Invalid ttl - not an integer
        bso = {"id": "TEST", "payload": "testing", "ttl": "4.2"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)
        # Invalid BSO - unknown field
        bso = {"id": "TEST", "unexpected": "spanish-inquisition"}
        res = self.retry_post_json(coll_url, [bso])
        self.assertTrue(res.json["failed"] and not res.json["success"])
        res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
        self.assertEqual(res.json, WEAVE_INVALID_WBO)

    def test_that_batch_gets_are_limited_to_max_number_of_ids(self):
        bso = {"id": "01", "payload": "testing"}
        self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)

        # Getting with less than the limit works OK.
        ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS - 1))
        res = self.app.get(self.root + "/storage/xxx_col2?ids=" + ids)
        self.assertEqual(res.json, ["01"])

        # Getting with equal to the limit works OK.
        ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS))
        res = self.app.get(self.root + "/storage/xxx_col2?ids=" + ids)
        self.assertEqual(res.json, ["01"])

        # Getting with more than the limit fails.
        ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS + 1))
        self.app.get(self.root + "/storage/xxx_col2?ids=" + ids, status=400)

    def test_that_batch_deletes_are_limited_to_max_number_of_ids(self):
        bso = {"id": "01", "payload": "testing"}

        # Deleting with less than the limit works OK.
        self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)
        ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS - 1))
        self.retry_delete(self.root + "/storage/xxx_col2?ids=" + ids)

        # Deleting with equal to the limit works OK.
        self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)
        ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS))
        self.retry_delete(self.root + "/storage/xxx_col2?ids=" + ids)

        # Deleting with more than the limit fails.
        self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)
        ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS + 1))
        self.retry_delete(self.root + "/storage/xxx_col2?ids=" + ids, status=400)

    def test_that_expired_items_can_be_overwritten_via_PUT(self):
        # Upload something with a small ttl.
        bso = {"payload": "XYZ", "ttl": 0}
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST", bso)
        # Wait for it to expire.
        time.sleep(0.02)
        self.app.get(self.root + "/storage/xxx_col2/TEST", status=404)
        # Overwriting it should still work.
        bso = {"payload": "XYZ", "ttl": 42}
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST", bso)

    def test_if_modified_since_on_info_views(self):
        # Store something, so the views have a modified time > 0.
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
        self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
        INFO_VIEWS = (
            "/info/collections",
            "/info/quota",
            "/info/collection_usage",
            "/info/collection_counts",
        )
        # Get the initial last-modified version.
        r = self.app.get(self.root + "/info/collections")
        ts1 = float(r.headers["X-Last-Modified"])
        self.assertTrue(ts1 > 0)
        # With X-I-M-S set before latest change, all should give a 200.
        headers = {"X-If-Modified-Since": str(ts1 - 1)}
        for view in INFO_VIEWS:
            self.app.get(self.root + view, headers=headers, status=200)
        # With X-I-M-S set to after latest change , all should give a 304.
        headers = {"X-If-Modified-Since": str(ts1)}
        for view in INFO_VIEWS:
            self.app.get(self.root + view, headers=headers, status=304)
        # Change a collection.
        bso = {"payload": "TEST"}
        r = self.retry_put_json(self.root + "/storage/xxx_col2/TEST", bso)
        ts2 = r.headers["X-Last-Modified"]
        # Using the previous version should read the updated data.
        headers = {"X-If-Modified-Since": str(ts1)}
        for view in INFO_VIEWS:
            self.app.get(self.root + view, headers=headers, status=200)
        # Using the new timestamp should produce 304s.
        headers = {"X-If-Modified-Since": str(ts2)}
        for view in INFO_VIEWS:
            self.app.get(self.root + view, headers=headers, status=304)
        # XXX TODO: the storage-level timestamp is not tracked correctly
        # after deleting a collection, so this test fails for now.
        # # Delete a collection.
        # r = self.retry_delete(self.root + "/storage/xxx_col2")
        # ts3 = r.headers["X-Last-Modified"]
        # # Using the previous timestamp should read the updated data.
        # headers = {"X-If-Modified-Since": str(ts2)}
        # for view in INFO_VIEWS:
        #     self.app.get(self.root + view, headers=headers, status=200)
        # # Using the new timestamp should produce 304s.
        # headers = {"X-If-Modified-Since": str(ts3)}
        # for view in INFO_VIEWS:
        #     self.app.get(self.root + view, headers=headers, status=304)

    def test_that_x_last_modified_is_sent_for_all_get_requests(self):
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
        self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        r = self.app.get(self.root + "/info/collections")
        self.assertTrue("X-Last-Modified" in r.headers)
        r = self.app.get(self.root + "/info/collection_counts")
        self.assertTrue("X-Last-Modified" in r.headers)
        r = self.app.get(self.root + "/storage/xxx_col2")
        self.assertTrue("X-Last-Modified" in r.headers)
        r = self.app.get(self.root + "/storage/xxx_col2/01")
        self.assertTrue("X-Last-Modified" in r.headers)

    def test_update_of_ttl_without_sending_data(self):
        bso = {"payload": "x", "ttl": 1}
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST1", bso)
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST2", bso)
        # Before those expire, update ttl on one that exists
        # and on one that does not.
        time.sleep(0.2)
        bso = {"ttl": 10}
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST2", bso)
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST3", bso)
        # Update some other field on TEST1, which should leave ttl untouched.
        bso = {"sortindex": 3}
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST1", bso)
        # If we wait, TEST1 should expire but the others should not.
        time.sleep(0.8)
        items = self.app.get(self.root + "/storage/xxx_col2?full=1").json
        items = dict((item["id"], item) for item in items)
        self.assertEqual(sorted(list(items.keys())), ["TEST2", "TEST3"])
        # The existing item should have retained its payload.
        # The new item should have got a default payload of empty string.
        self.assertEqual(items["TEST2"]["payload"], "x")
        self.assertEqual(items["TEST3"]["payload"], "")
        ts2 = items["TEST2"]["modified"]
        ts3 = items["TEST3"]["modified"]
        self.assertTrue(ts2 < ts3)

    def test_bulk_update_of_ttls_without_sending_data(self):
        # Create 5 BSOs with a ttl of 1 second.
        bsos = [{"id": str(i).zfill(2), "payload": "x", "ttl": 1} for i in range(5)]
        r = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        ts1 = float(r.headers["X-Last-Modified"])
        # Before they expire, bulk-update the ttl to something longer.
        # Also send data for some that don't exist yet.
        # And just to be really tricky, we're also going to update
        # one of the payloads at the same time.
        time.sleep(0.2)
        bsos = [{"id": str(i).zfill(2), "ttl": 10} for i in range(3, 7)]
        bsos[0]["payload"] = "xx"
        r = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
        self.assertEqual(len(r.json["success"]), 4)
        ts2 = float(r.headers["X-Last-Modified"])
        # If we wait then items 0, 1, 2 should have expired.
        # Items 3, 4, 5, 6 should still exist.
        time.sleep(0.8)
        items = self.app.get(self.root + "/storage/xxx_col2?full=1").json
        items = dict((item["id"], item) for item in items)
        self.assertEqual(sorted(list(items.keys())), ["03", "04", "05", "06"])
        # Items 3 and 4 should have the specified payloads.
        # Items 5 and 6 should have payload defaulted to empty string.
        self.assertEqual(items["03"]["payload"], "xx")
        self.assertEqual(items["04"]["payload"], "x")
        self.assertEqual(items["05"]["payload"], "")
        self.assertEqual(items["06"]["payload"], "")
        # All items created or modified by the request should get their
        # timestamps update.  Just bumping the ttl should not bump timestamp.
        self.assertEqual(items["03"]["modified"], ts2)
        self.assertEqual(items["04"]["modified"], ts1)
        self.assertEqual(items["05"]["modified"], ts2)
        self.assertEqual(items["06"]["modified"], ts2)

    def test_that_negative_integer_fields_are_not_accepted(self):
        # ttls cannot be negative
        self.retry_put_json(
            self.root + "/storage/xxx_col2/TEST",
            {
                "payload": "TEST",
                "ttl": -1,
            },
            status=400,
        )
        # limit cannot be negative
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST", {"payload": "X"})
        self.app.get(self.root + "/storage/xxx_col2?limit=-1", status=400)
        # X-If-Modified-Since cannot be negative
        self.app.get(
            self.root + "/storage/xxx_col2",
            headers={
                "X-If-Modified-Since": "-3",
            },
            status=400,
        )
        # X-If-Unmodified-Since cannot be negative
        self.retry_put_json(
            self.root + "/storage/xxx_col2/TEST",
            {
                "payload": "TEST",
            },
            headers={
                "X-If-Unmodified-Since": "-3",
            },
            status=400,
        )
        # sortindex actually *can* be negative
        self.retry_put_json(
            self.root + "/storage/xxx_col2/TEST",
            {
                "payload": "TEST",
                "sortindex": -42,
            },
            status=200,
        )

    def test_meta_global_sanity(self):
        # Memcache backend is configured to store 'meta' in write-through
        # cache, so we want to check it explicitly.  We might as well put it
        # in the base tests because there's nothing memcached-specific here.
        self.app.get(self.root + "/storage/meta/global", status=404)
        res = self.app.get(self.root + "/storage/meta")
        self.assertEqual(res.json, [])
        self.retry_put_json(self.root + "/storage/meta/global", {"payload": "blob"})
        res = self.app.get(self.root + "/storage/meta")
        self.assertEqual(res.json, ["global"])
        res = self.app.get(self.root + "/storage/meta/global")
        self.assertEqual(res.json["payload"], "blob")
        # It should not have extra keys.
        keys = list(res.json.keys())
        keys.sort()
        self.assertEqual(keys, ["id", "modified", "payload"])
        # It should have a properly-formatted "modified" field.
        modified_re = r"['\"]modified['\"]:\s*[0-9]+\.[0-9][0-9]\s*[,}]"
        self.assertTrue(re.search(modified_re, res.body.decode("utf-8")))
        # Any client-specified "modified" field should be ignored
        res = self.retry_put_json(
            self.root + "/storage/meta/global",
            {"payload": "blob", "modified": 12},
        )
        ts = float(res.headers["X-Weave-Timestamp"])
        res = self.app.get(self.root + "/storage/meta/global")
        self.assertEqual(res.json["modified"], ts)

    def test_that_404_responses_have_a_json_body(self):
        res = self.app.get(self.root + "/nonexistent/url", status=404)
        self.assertEqual(res.content_type, "application/json")
        self.assertEqual(res.json, 0)

    def test_that_internal_server_fields_are_not_echoed(self):
        self.retry_post_json(
            self.root + "/storage/xxx_col1", [{"id": "one", "payload": "blob"}]
        )
        self.retry_put_json(self.root + "/storage/xxx_col1/two", {"payload": "blub"})
        res = self.app.get(self.root + "/storage/xxx_col1?full=1")
        self.assertEqual(len(res.json), 2)
        for item in res.json:
            self.assertTrue("id" in item)
            self.assertTrue("payload" in item)
            self.assertFalse("payload_size" in item)
            self.assertFalse("ttl" in item)
        for id in ("one", "two"):
            res = self.app.get(self.root + "/storage/xxx_col1/" + id)
            self.assertTrue("id" in res.json)
            self.assertTrue("payload" in res.json)
            self.assertFalse("payload_size" in res.json)
            self.assertFalse("ttl" in res.json)

    def test_accessing_info_collections_with_an_expired_token(self):
        # This can't be run against a live server because we
        # have to forge an auth token to test things properly.
        if self.distant:
            pytest.skip("Test cannot be run against a live server.")

        # Write some items while we've got a good token.
        bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
        resp = self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
        ts = float(resp.headers["X-Last-Modified"])

        # Check that we can read the info correctly.
        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(list(resp.json.keys()), ["xxx_col1"])
        self.assertEqual(resp.json["xxx_col1"], ts)

        # Forge an expired token to use for the test.
        auth_policy = self.config.registry.getUtility(IAuthenticationPolicy)
        secret = auth_policy._get_token_secrets(self.host_url)[-1]
        tm = tokenlib.TokenManager(secret=secret)
        exp = time.time() - 60
        data = {
            "uid": self.user_id,
            "node": self.host_url,
            "expires": exp,
            "hashed_fxa_uid": self.hashed_fxa_uid,
            "fxa_uid": self.fxa_uid,
            "fxa_kid": self.fxa_kid,
        }
        self.auth_token = tm.make_token(data)
        self.auth_secret = tm.get_derived_secret(self.auth_token)

        # The expired token cannot be used for normal operations.
        bsos = [{"id": str(i).zfill(2), "payload": "aaa"} for i in range(3)]
        self.retry_post_json(self.root + "/storage/xxx_col1", bsos, status=401)
        self.app.get(self.root + "/storage/xxx_col1", status=401)

        # But it still allows access to /info/collections.
        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(list(resp.json.keys()), ["xxx_col1"])
        self.assertEqual(resp.json["xxx_col1"], ts)

    def test_pagination_with_newer_and_sort_by_oldest(self):
        # Twelve bsos with three different modification times.
        NUM_ITEMS = 12
        bsos = []
        timestamps = []
        for i in range(NUM_ITEMS):
            bso = {"id": str(i).zfill(2), "payload": "x"}
            bsos.append(bso)
            if i % 4 == 3:
                res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
                ts = float(res.headers["X-Last-Modified"])
                timestamps.append((i, ts))
                bsos = []

        # Try with several different pagination sizes,
        # to hit various boundary conditions.
        for limit in (2, 3, 4, 5, 6):
            for start, ts in timestamps:
                query_url = self.root + "/storage/xxx_col2?full=true&sort=oldest"
                query_url += "&newer=%s&limit=%s" % (ts, limit)

                # Paginated-ly fetch all items.
                items = []
                res = self.app.get(query_url)
                for item in res.json:
                    if items:
                        assert items[-1]["modified"] <= item["modified"]
                    items.append(item)
                next_offset = res.headers.get("X-Weave-Next-Offset")
                while next_offset is not None:
                    res = self.app.get(query_url + "&offset=" + next_offset)
                    for item in res.json:
                        assert items[-1]["modified"] <= item["modified"]
                        items.append(item)
                    next_offset = res.headers.get("X-Weave-Next-Offset")

                # They should all be in order, starting from the item
                # *after* the one that was used for the newer= timestamp.
                self.assertEqual(
                    sorted(int(item["id"]) for item in items),
                    list(range(start + 1, NUM_ITEMS)),
                )

    def test_pagination_with_older_and_sort_by_newest(self):
        # Twelve bsos with three different modification times.
        NUM_ITEMS = 12
        bsos = []
        timestamps = []
        for i in range(NUM_ITEMS):
            bso = {"id": str(i).zfill(2), "payload": "x"}
            bsos.append(bso)
            if i % 4 == 3:
                res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
                ts = float(res.headers["X-Last-Modified"])
                timestamps.append((i - 3, ts))
                bsos = []

        # Try with several different pagination sizes,
        # to hit various boundary conditions.
        for limit in (2, 3, 4, 5, 6):
            for start, ts in timestamps:
                query_url = self.root + "/storage/xxx_col2?full=true&sort=newest"
                query_url += "&older=%s&limit=%s" % (ts, limit)

                # Paginated-ly fetch all items.
                items = []
                res = self.app.get(query_url)
                for item in res.json:
                    if items:
                        assert items[-1]["modified"] >= item["modified"]
                    items.append(item)
                next_offset = res.headers.get("X-Weave-Next-Offset")
                while next_offset is not None:
                    res = self.app.get(query_url + "&offset=" + next_offset)
                    for item in res.json:
                        assert items[-1]["modified"] >= item["modified"]
                        items.append(item)
                    next_offset = res.headers.get("X-Weave-Next-Offset")

                # They should all be in order, up to the item *before*
                # the one that was used for the older= timestamp.
                self.assertEqual(
                    sorted(int(item["id"]) for item in items),
                    list(range(0, start)),
                )

    def assertCloseEnough(self, val1, val2, delta=0.05):
        if abs(val1 - val2) < delta:
            return True
        raise AssertionError(
            "abs(%.2f - %.2f) = %.2f > %.2f" % (val1, val2, abs(val1 - val2), delta)
        )

    def test_batches(self):
        endpoint = self.root + "/storage/xxx_col2"

        bso1 = {"id": "12", "payload": "elegance"}
        bso2 = {"id": "13", "payload": "slovenly"}
        bsos = [bso1, bso2]
        self.retry_post_json(endpoint, bsos)

        resp = self.app.get(endpoint + "/12")
        orig_modified = resp.headers["X-Last-Modified"]

        bso3 = {"id": "a", "payload": "internal"}
        bso4 = {"id": "b", "payload": "pancreas"}
        resp = self.retry_post_json(endpoint + "?batch=true", [bso3, bso4])
        batch = resp.json["batch"]

        # The collection should not be reported as modified.
        self.assertEqual(orig_modified, resp.headers["X-Last-Modified"])

        # And reading from it shouldn't show the new records yet.
        resp = self.app.get(endpoint)
        res = resp.json
        res.sort()
        self.assertEqual(res, ["12", "13"])
        self.assertEqual(int(resp.headers["X-Weave-Records"]), 2)
        self.assertEqual(orig_modified, resp.headers["X-Last-Modified"])

        bso5 = {"id": "c", "payload": "tinsel"}
        bso6 = {"id": "13", "payload": "portnoy"}
        bso0 = {"id": "14", "payload": "itsybitsy"}
        commit = "?batch={0}&commit=true".format(batch)
        resp = self.retry_post_json(endpoint + commit, [bso5, bso6, bso0])
        committed = resp.json["modified"]
        print(committed)
        self.assertEqual(resp.json["modified"], float(resp.headers["X-Last-Modified"]))

        # make sure /info/collections got updated
        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(float(resp.headers["X-Last-Modified"]), committed)
        self.assertEqual(resp.json["xxx_col2"], committed)

        # make sure the changes applied
        resp = self.app.get(endpoint)
        res = resp.json
        res.sort()
        self.assertEqual(res, ["12", "13", "14", "a", "b", "c"])
        self.assertEqual(int(resp.headers["X-Weave-Records"]), 6)
        resp = self.app.get(endpoint + "/13")
        self.assertEqual(resp.json["payload"], "portnoy")
        self.assertEqual(committed, float(resp.headers["X-Last-Modified"]))
        self.assertEqual(committed, resp.json["modified"])
        resp = self.app.get(endpoint + "/c")
        self.assertEqual(resp.json["payload"], "tinsel")
        self.assertEqual(committed, resp.json["modified"])
        resp = self.app.get(endpoint + "/14")
        self.assertEqual(resp.json["payload"], "itsybitsy")
        self.assertEqual(committed, resp.json["modified"])

        # empty commit POST
        bso7 = {"id": "a", "payload": "burrito"}
        bso8 = {"id": "e", "payload": "chocolate"}
        resp = self.retry_post_json(endpoint + "?batch=true", [bso7, bso8])
        batch = resp.json["batch"]
        time.sleep(1)
        commit = "?batch={0}&commit=true".format(batch)

        resp1 = self.retry_post_json(endpoint + commit, [])
        committed = resp1.json["modified"]
        self.assertEqual(committed, float(resp1.headers["X-Last-Modified"]))

        resp2 = self.app.get(endpoint + "/a")
        self.assertEqual(committed, float(resp2.headers["X-Last-Modified"]))
        self.assertEqual(committed, resp2.json["modified"])
        self.assertEqual(resp2.json["payload"], "burrito")

        resp3 = self.app.get(endpoint + "/e")
        self.assertEqual(committed, resp3.json["modified"])

    def test_aaa_batch_commit_collision(self):
        # It's possible that a batch contain a BSO inside a batch as well
        # as inside the final "commit" message. This is a bit of a problem
        # for spanner because of conflicting ways that the data is written
        # to the database and the discoverability of IDs in previously
        # submitted batches.
        endpoint = self.root + "/storage/xxx_col2"
        orig = "Letting the days go by"
        repl = "Same as it ever was"

        batch_num = self.retry_post_json(
            endpoint + "?batch=true", [{"id": "b0", "payload": orig}]
        ).json["batch"]

        resp = self.retry_post_json(
            endpoint + "?batch={}&commit=true".format(batch_num),
            [{"id": "b0", "payload": repl}],
        )

        # this should succeed, using the newerer payload value.
        assert resp.json["failed"] == {}, "batch commit failed"
        assert resp.json["success"] == ["b0"], "batch commit id incorrect"
        resp = self.app.get(endpoint + "?full=1")
        assert resp.json[0].get("payload") == repl, "wrong payload returned"

    def test_we_dont_need_no_stinkin_batches(self):
        endpoint = self.root + "/storage/xxx_col2"

        # invalid batch ID
        bso1 = {"id": "f", "payload": "pantomime"}
        self.retry_post_json(endpoint + "?batch=sammich", [bso1], status=400)

        # commit with no batch ID
        self.retry_post_json(endpoint + "?commit=true", [], status=400)

    def test_batch_size_limits(self):
        limits = self.app.get(self.root + "/info/configuration").json
        self.assertTrue("max_post_records" in limits)
        self.assertTrue("max_post_bytes" in limits)
        self.assertTrue("max_total_records" in limits)
        self.assertTrue("max_total_bytes" in limits)
        self.assertTrue("max_record_payload_bytes" in limits)
        self.assertTrue("max_request_bytes" in limits)

        endpoint = self.root + "/storage/xxx_col2?batch=true"
        # There are certain obvious constraints on these limits,
        # violations of which would be very confusing for clients.
        #
        #         self.assertTrue(
        #             limits['max_request_bytes'] > limits['max_post_bytes']
        #         )
        #         self.assertTrue(
        #             limits['max_post_bytes'] >= limits['max_record_payload_bytes']
        #         )
        #         self.assertTrue(
        #             limits['max_total_records'] >= limits['max_post_records']
        #         )
        #         self.assertTrue(
        #             limits['max_total_bytes'] >= limits['max_post_bytes']
        #         )
        #
        #         # `max_post_records` is an (inclusive) limit on
        #         # the number of items in a single post.
        #
        #         res = self.retry_post_json(endpoint, [], headers={
        #             'X-Weave-Records': str(limits['max_post_records'])
        #         })
        #         self.assertFalse(res.json['failed'])
        #         res = self.retry_post_json(endpoint, [], headers={
        #             'X-Weave-Records': str(limits['max_post_records'] + 1)
        #         }, status=400)
        #         self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
        #
        #         bsos = [{'id': str(x), 'payload': ''}
        #                 for x in range(limits['max_post_records'])]
        #         res = self.retry_post_json(endpoint, bsos)
        #         self.assertFalse(res.json['failed'])
        #         bsos.append({'id': 'toomany', 'payload': ''})
        #         res = self.retry_post_json(endpoint, bsos)
        #         self.assertEqual(res.json['failed']['toomany'], 'retry bso')
        #
        #         # `max_total_records` is an (inclusive) limit on the
        #         # total number of items in a batch.  We can only enforce
        #         # it if the client tells us this via header.
        #
        #         self.retry_post_json(endpoint, [], headers={
        #             'X-Weave-Total-Records': str(limits['max_total_records'])
        #         })
        #         res = self.retry_post_json(endpoint, [], headers={
        #             'X-Weave-Total-Records': str(limits['max_total_records'] + 1)
        #         }, status=400)
        #         self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
        #
        #         # `max_post_bytes` is an (inclusive) limit on the
        #         # total size of payloads in a single post.
        #
        #         self.retry_post_json(endpoint, [], headers={
        #             'X-Weave-Bytes': str(limits['max_post_bytes'])
        #         })
        #         res = self.retry_post_json(endpoint, [], headers={
        #             'X-Weave-Bytes': str(limits['max_post_bytes'] + 1)
        #         }, status=400)
        #         self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
        bsos = [
            {"id": "little", "payload": "XXX"},
            {"id": "big", "payload": "X" * (limits["max_post_bytes"] - 3)},
        ]
        res = self.retry_post_json(endpoint, bsos)
        self.assertFalse(res.json["failed"])
        bsos[1]["payload"] += "X"
        res = self.retry_post_json(endpoint, bsos)
        self.assertEqual(res.json["success"], ["little"])
        self.assertEqual(res.json["failed"]["big"], "retry bytes")

        # `max_total_bytes` is an (inclusive) limit on the
        # total size of all payloads in a batch.  We can only enforce
        # it if the client tells us this via header.

        self.retry_post_json(
            endpoint,
            [],
            headers={"X-Weave-Total-Bytes": str(limits["max_total_bytes"])},
        )
        res = self.retry_post_json(
            endpoint,
            [],
            headers={"X-Weave-Total-Bytes": str(limits["max_total_bytes"] + 1)},
            status=400,
        )
        self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)

    def text_max_total_records(self):
        """Test ensuring `max_total_records` setting of 1664 in the
        `/info/configuration` endpoint` is validated.

        `max_total_records` is an (inclusive) limit on the total
        number of items in a batch

        Is related directly to `X-Weave-Total-Records` header.
        """
        endpoint = self.root + "/storage/xxx_col2"
        endpoint_batch = self.root + "/storage/xxx_col2?batch=true"
        conf_limits = self.app.get(self.root + "/info/configuration")
        try:
            limit = conf_limits.json["max_total_records"]
        except KeyError:
            # This can't be run against a live server because we
            # have to forge an auth token to test things properly.
                if self.distant:
                    pytest.skip("Test cannot be run against a live server.")
                raise
        
        max_total_records = limit
        self.assertTrue("max_total_records" in conf_limits)
        self.assertTrue(max_total_records == 1664)

        # We can only enforce it if the client tells us this via the
        # 'X-Weave-Total-Records' header.
        self.retry_post_json(endpoint_batch, [], headers={
            'X-Weave-Total-Records': str(conf_limits['max_total_records'])
        })
        res = self.retry_post_json(endpoint_batch, [], headers={
            'X-Weave-Total-Records': str(conf_limits['max_total_records'] + 1)
        }, status=400)
        self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)

        # Success case within limit
        bsos = [{'id': str(i), 'payload': 'X'} for i in range(max_total_records)]
        resp = self.retry_post_json(endpoint_batch, bsos)
        batch = resp.json["batch"]
        resp = self.retry_post_json(f"{endpoint}?batch={batch}&commit=true", [])
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.json['modified'], max_total_records)

        # Fail case above limit (+1)
        bsos = [{'id': str(i), 'payload': 'X'} for i in range(max_total_records + 1)]
        resp = self.retry_post_json(endpoint_batch, bsos)
        batch = resp.json["batch"]
        resp = self.retry_post_json(f"{endpoint}?batch={batch}&commit=true", [])
        self.assertIn('error', resp.json)
        self.assertEqual(resp.status_code, 400)

    def test_batch_partial_update(self):
        collection = self.root + "/storage/xxx_col2"
        bsos = [
            {"id": "a", "payload": "aai"},
            {"id": "b", "payload": "bee", "sortindex": 17},
        ]
        resp = self.retry_post_json(collection, bsos)
        orig_ts = float(resp.headers["X-Last-Modified"])

        # Update one, and add a new one.
        bsos = [
            {"id": "b", "payload": "bii"},
            {"id": "c", "payload": "sea"},
        ]
        resp = self.retry_post_json(collection + "?batch=true", bsos)
        batch = resp.json["batch"]
        self.assertEqual(orig_ts, float(resp.headers["X-Last-Modified"]))

        # The updated item hasn't been written yet.
        resp = self.app.get(collection + "?full=1")
        res = resp.json
        res.sort(key=lambda bso: bso["id"])
        self.assertEqual(len(res), 2)
        self.assertEqual(res[0]["payload"], "aai")
        self.assertEqual(res[1]["payload"], "bee")
        self.assertEqual(res[0]["modified"], orig_ts)
        self.assertEqual(res[1]["modified"], orig_ts)
        self.assertEqual(res[1]["sortindex"], 17)

        endpoint = collection + "?batch={0}&commit=true".format(batch)
        resp = self.retry_post_json(endpoint, [])
        commit_ts = float(resp.headers["X-Last-Modified"])

        # The changes have now been applied.
        resp = self.app.get(collection + "?full=1")
        res = resp.json
        res.sort(key=lambda bso: bso["id"])
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0]["payload"], "aai")
        self.assertEqual(res[1]["payload"], "bii")
        self.assertEqual(res[2]["payload"], "sea")
        self.assertEqual(res[0]["modified"], orig_ts)
        self.assertEqual(res[1]["modified"], commit_ts)
        self.assertEqual(res[2]["modified"], commit_ts)

        # Fields not touched by the batch, should have been preserved.
        self.assertEqual(res[1]["sortindex"], 17)

    def test_batch_ttl_update(self):
        collection = self.root + "/storage/xxx_col2"
        bsos = [
            {"id": "a", "payload": "ayy"},
            {"id": "b", "payload": "bea"},
            {"id": "c", "payload": "see"},
        ]
        resp = self.retry_post_json(collection, bsos)

        # Bump ttls as a series of individual batch operations.
        resp = self.retry_post_json(collection + "?batch=true", [], status=202)
        orig_ts = float(resp.headers["X-Last-Modified"])
        batch = resp.json["batch"]

        endpoint = collection + "?batch={0}".format(batch)
        resp = self.retry_post_json(endpoint, [{"id": "a", "ttl": 2}], status=202)
        self.assertEqual(orig_ts, float(resp.headers["X-Last-Modified"]))
        resp = self.retry_post_json(endpoint, [{"id": "b", "ttl": 2}], status=202)
        self.assertEqual(orig_ts, float(resp.headers["X-Last-Modified"]))
        resp = self.retry_post_json(endpoint + "&commit=true", [], status=200)

        # The payloads should be unchanged
        resp = self.app.get(collection + "?full=1")
        res = resp.json
        res.sort(key=lambda bso: bso["id"])
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0]["payload"], "ayy")
        self.assertEqual(res[1]["payload"], "bea")
        self.assertEqual(res[2]["payload"], "see")

        # If we wait, the ttls should kick in
        time.sleep(2.1)
        resp = self.app.get(collection + "?full=1")
        res = resp.json
        self.assertEqual(len(res), 1)
        self.assertEqual(res[0]["payload"], "see")

    def test_batch_ttl_is_based_on_commit_timestamp(self):
        collection = self.root + "/storage/xxx_col2"

        resp = self.retry_post_json(collection + "?batch=true", [], status=202)
        batch = resp.json["batch"]
        endpoint = collection + "?batch={0}".format(batch)
        resp = self.retry_post_json(endpoint, [{"id": "a", "ttl": 3}], status=202)

        # Put some time between upload timestamp and commit timestamp.
        time.sleep(1.5)

        resp = self.retry_post_json(endpoint + "&commit=true", [], status=200)

        # Wait a little; if ttl is taken from the time of the commit
        # then it should not kick in just yet.
        time.sleep(1.6)
        resp = self.app.get(collection)
        res = resp.json
        self.assertEqual(len(res), 1)
        self.assertEqual(res[0], "a")

        # Wait some more, and the ttl should kick in.
        time.sleep(1.6)
        resp = self.app.get(collection)
        res = resp.json
        self.assertEqual(len(res), 0)

    def test_batch_with_immediate_commit(self):
        collection = self.root + "/storage/xxx_col2"
        bsos = [
            {"id": "a", "payload": "aih"},
            {"id": "b", "payload": "bie"},
            {"id": "c", "payload": "cee"},
        ]

        resp = self.retry_post_json(
            collection + "?batch=true&commit=true", bsos, status=200
        )
        self.assertTrue("batch" not in resp.json)
        self.assertTrue("modified" in resp.json)
        committed = resp.json["modified"]

        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(float(resp.headers["X-Last-Modified"]), committed)
        self.assertEqual(resp.json["xxx_col2"], committed)

        resp = self.app.get(collection + "?full=1")
        self.assertEqual(float(resp.headers["X-Last-Modified"]), committed)
        res = resp.json
        res.sort(key=lambda bso: bso["id"])
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0]["payload"], "aih")
        self.assertEqual(res[1]["payload"], "bie")
        self.assertEqual(res[2]["payload"], "cee")

    def test_batch_uploads_properly_update_info_collections(self):
        collection1 = self.root + "/storage/xxx_col1"
        collection2 = self.root + "/storage/xxx_col2"
        bsos = [
            {"id": "a", "payload": "aih"},
            {"id": "b", "payload": "bie"},
            {"id": "c", "payload": "cee"},
        ]

        resp = self.retry_post_json(collection1, bsos)
        ts1 = resp.json["modified"]

        resp = self.retry_post_json(collection2, bsos)
        ts2 = resp.json["modified"]

        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(float(resp.headers["X-Last-Modified"]), ts2)
        self.assertEqual(resp.json["xxx_col1"], ts1)
        self.assertEqual(resp.json["xxx_col2"], ts2)

        # Overwrite in place, timestamp should change.
        resp = self.retry_post_json(collection2 + "?batch=true&commit=true", bsos[:2])
        self.assertTrue(resp.json["modified"] > ts2)
        ts2 = resp.json["modified"]

        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(float(resp.headers["X-Last-Modified"]), ts2)
        self.assertEqual(resp.json["xxx_col1"], ts1)
        self.assertEqual(resp.json["xxx_col2"], ts2)

        # Add new items, timestamp should change
        resp = self.retry_post_json(
            collection1 + "?batch=true&commit=true",
            [{"id": "d", "payload": "dee"}],
        )
        self.assertTrue(resp.json["modified"] > ts1)
        self.assertTrue(resp.json["modified"] >= ts2)
        ts1 = resp.json["modified"]

        resp = self.app.get(self.root + "/info/collections")
        self.assertEqual(float(resp.headers["X-Last-Modified"]), ts1)
        self.assertEqual(resp.json["xxx_col1"], ts1)
        self.assertEqual(resp.json["xxx_col2"], ts2)

    def test_batch_with_failing_bsos(self):
        collection = self.root + "/storage/xxx_col2"
        bsos = [
            {"id": "a", "payload": "aai"},
            {"id": "b\n", "payload": "i am invalid", "sortindex": 17},
        ]
        resp = self.retry_post_json(collection + "?batch=true", bsos)
        self.assertEqual(len(resp.json["failed"]), 1)
        self.assertEqual(len(resp.json["success"]), 1)
        batch = resp.json["batch"]

        bsos = [
            {"id": "c", "payload": "sea"},
            {"id": "d", "payload": "dii", "ttl": -12},
        ]
        endpoint = collection + "?batch={0}&commit=true".format(batch)
        resp = self.retry_post_json(endpoint, bsos)
        self.assertEqual(len(resp.json["failed"]), 1)
        self.assertEqual(len(resp.json["success"]), 1)

        # To correctly match semantics of batchless POST, the batch
        # should be committed including only the successful items.
        # It is the client's responsibility to detect that some items
        # failed, and decide whether to commit the batch.
        resp = self.app.get(collection + "?full=1")
        res = resp.json
        res.sort(key=lambda bso: bso["id"])
        self.assertEqual(len(res), 2)
        self.assertEqual(res[0]["payload"], "aai")
        self.assertEqual(res[1]["payload"], "sea")

    def test_batch_id_is_correctly_scoped_to_a_collection(self):
        collection1 = self.root + "/storage/xxx_col1"
        bsos = [
            {"id": "a", "payload": "aih"},
            {"id": "b", "payload": "bie"},
            {"id": "c", "payload": "cee"},
        ]
        resp = self.retry_post_json(collection1 + "?batch=true", bsos)
        batch = resp.json["batch"]

        # I should not be able to add to that batch in a different collection.
        endpoint2 = self.root + "/storage/xxx_col2?batch={0}".format(batch)
        resp = self.retry_post_json(
            endpoint2, [{"id": "d", "payload": "dii"}], status=400
        )

        # I should not be able to commit that batch in a different collection.
        resp = self.retry_post_json(endpoint2 + "&commit=true", [], status=400)

        # I should still be able to use the batch in the correct collection.
        endpoint1 = collection1 + "?batch={0}".format(batch)
        resp = self.retry_post_json(endpoint1, [{"id": "d", "payload": "dii"}])
        resp = self.retry_post_json(endpoint1 + "&commit=true", [])

        resp = self.app.get(collection1 + "?full=1")
        res = resp.json
        res.sort(key=lambda bso: bso["id"])
        self.assertEqual(len(res), 4)
        self.assertEqual(res[0]["payload"], "aih")
        self.assertEqual(res[1]["payload"], "bie")
        self.assertEqual(res[2]["payload"], "cee")
        self.assertEqual(res[3]["payload"], "dii")

    def test_users_with_the_same_batch_id_get_separate_data(self):
        # Try to generate two users with the same batch-id.
        # It might take a couple of attempts...
        for _ in range(100):
            bsos = [{"id": "a", "payload": "aih"}]
            req = "/storage/xxx_col1?batch=true"
            resp = self.retry_post_json(self.root + req, bsos)
            batch1 = resp.json["batch"]
            with self._switch_user():
                bsos = [{"id": "b", "payload": "bee"}]
                req = "/storage/xxx_col1?batch=true"
                resp = self.retry_post_json(self.root + req, bsos)
                batch2 = resp.json["batch"]
                # Let the second user commit their batch.
                req = "/storage/xxx_col1?batch={0}&commit=true".format(batch2)
                self.retry_post_json(self.root + req, [])
                # It should only have a single item.
                resp = self.app.get(self.root + "/storage/xxx_col1")
                self.assertEqual(resp.json, ["b"])
            # The first user's collection should still be empty.
            # Now have the  first user commit their batch.
            req = "/storage/xxx_col1?batch={0}&commit=true".format(batch1)
            self.retry_post_json(self.root + req, [])
            # It should only have a single item.
            resp = self.app.get(self.root + "/storage/xxx_col1")
            self.assertEqual(resp.json, ["a"])
            # If we didn't make a conflict, try again.
            if batch1 == batch2:
                break
        else:
            pytest.skip("failed to generate conflicting batchid")

    def test_that_we_dont_resurrect_committed_batches(self):
        # This retry loop tries to trigger a situation where we:
        #  * create a batch with a single item
        #  * successfully commit that batch
        #  * create a new batch tht re-uses the same batchid
        for _ in range(100):
            bsos = [{"id": "i", "payload": "aye"}]
            req = "/storage/xxx_col1?batch=true"
            resp = self.retry_post_json(self.root + req, bsos)
            batch1 = resp.json["batch"]
            req = "/storage/xxx_col1?batch={0}&commit=true".format(batch1)
            self.retry_post_json(self.root + req, [])
            req = "/storage/xxx_col2?batch=true"
            resp = self.retry_post_json(self.root + req, [])
            batch2 = resp.json["batch"]
            bsos = [{"id": "j", "payload": "jay"}]
            req = "/storage/xxx_col2?batch={0}&commit=true".format(batch2)
            self.retry_post_json(self.root + req, bsos)
            # Retry if we failed to trigger re-use of the batchid.
            if batch1 == batch2:
                break
        else:
            pytest.skip("failed to trigger re-use of batchid")

        # Despite having the same batchid, the second batch should
        # be completely independent of the first.
        resp = self.app.get(self.root + "/storage/xxx_col2")
        self.assertEqual(resp.json, ["j"])

    def test_batch_id_is_correctly_scoped_to_a_user(self):
        collection = self.root + "/storage/xxx_col1"
        bsos = [
            {"id": "a", "payload": "aih"},
            {"id": "b", "payload": "bie"},
            {"id": "c", "payload": "cee"},
        ]
        resp = self.retry_post_json(collection + "?batch=true", bsos)
        batch = resp.json["batch"]

        with self._switch_user():
            # I should not be able to add to that batch as a different user.
            endpoint = self.root + "/storage/xxx_col1?batch={0}".format(batch)
            resp = self.retry_post_json(
                endpoint, [{"id": "d", "payload": "di"}], status=400
            )

            # I should not be able to commit that batch as a different user.
            resp = self.retry_post_json(endpoint + "&commit=true", [], status=400)

        # I should still be able to use the batch in the original user.
        endpoint = collection + "?batch={0}".format(batch)
        resp = self.retry_post_json(endpoint, [{"id": "d", "payload": "di"}])
        resp = self.retry_post_json(endpoint + "&commit=true", [])

        resp = self.app.get(collection + "?full=1")
        res = resp.json
        res.sort(key=lambda bso: bso["id"])
        self.assertEqual(len(res), 4)
        self.assertEqual(res[0]["payload"], "aih")
        self.assertEqual(res[1]["payload"], "bie")
        self.assertEqual(res[2]["payload"], "cee")
        self.assertEqual(res[3]["payload"], "di")

    # bug 1332552 make sure ttl:null use the default ttl
    def test_create_bso_with_null_ttl(self):
        bso = {"payload": "x", "ttl": None}
        self.retry_put_json(self.root + "/storage/xxx_col2/TEST1", bso)
        time.sleep(0.1)
        res = self.app.get(self.root + "/storage/xxx_col2/TEST1?full=1")
        self.assertEqual(res.json["payload"], "x")

    def test_rejection_of_known_bad_payloads(self):
        bso = {
            "id": "keys",
            "payload": json_dumps(
                {
                    "ciphertext": "IDontKnowWhatImDoing",
                    "IV": "AAAAAAAAAAAAAAAAAAAAAA==",
                }
            ),
        }
        # Fishy IVs are rejected on the "crypto" collection.
        self.retry_put_json(self.root + "/storage/crypto/keys", bso, status=400)
        self.retry_put_json(self.root + "/storage/crypto/blerg", bso, status=400)
        self.retry_post_json(self.root + "/storage/crypto", [bso], status=400)
        # But are allowed on other collections.
        self.retry_put_json(self.root + "/storage/xxx_col2/keys", bso, status=200)
        self.retry_post_json(self.root + "/storage/xxx_col2", [bso], status=200)

    # bug 1397357
    def test_batch_empty_commit(self):
        def testEmptyCommit(contentType, body, status=200):
            bsos = [{"id": str(i).zfill(2), "payload": "X"} for i in range(5)]
            res = self.retry_post_json(self.root + "/storage/xxx_col?batch=true", bsos)
            self.assertEqual(len(res.json["success"]), 5)
            self.assertEqual(len(res.json["failed"]), 0)
            batch = res.json["batch"]
            self.app.post(
                self.root + "/storage/xxx_col?commit=true&batch=" + batch,
                body,
                headers={"Content-Type": contentType},
                status=status,
            )

        testEmptyCommit("application/json", "[]")
        testEmptyCommit("application/json", "{}", status=400)
        testEmptyCommit("application/json", "", status=400)

        testEmptyCommit("application/newlines", "")
        testEmptyCommit("application/newlines", "\n", status=400)
        testEmptyCommit("application/newlines", "{}", status=400)
        testEmptyCommit("application/newlines", "[]", status=400)

    def test_cors_settings_are_set(self):
        res = self.app.options(
            self.root + "/__heartbeat__",
            headers={
                "Access-Control-Request-Method": "GET",
                "Origin": "localhost",
                "Access-Control-Request-Headers": "Content-Type",
            },
        )

        self.assertEqual(int(res.headers["access-control-max-age"]), 555)
        self.assertEqual(res.headers["access-control-allow-origin"], "localhost")

    def test_cors_allows_any_origin(self):
        self.app.options(
            self.root + "/__heartbeat__",
            headers={
                "Access-Control-Request-Method": "GET",
                "Origin": "http://test-website.com",
                "Access-Control-Request-Headers": "Content-Type",
            },
            status=200,
        )

    # PATCH is not a default allowed method, so request should return 405
    def test_patch_is_not_allowed(self):
        collection = self.root + "/storage/xxx_col1"
        with self.assertRaises(AppError) as error:
            self.app.patch_json(collection)
            self.assertIn("405", error)
